import collections
import logging
import os
import time

from . import config
from . import engines
from .. import create_engine
from .. import exc
from .. import text
from ..engine import url as sa_url
from ..util import compat


log = logging.getLogger(__name__)

FOLLOWER_IDENT = None


class register(object):
    def __init__(self):
        self.fns = {}

    @classmethod
    def init(cls, fn):
        return register().for_db("*")(fn)

    def for_db(self, dbname):
        def decorate(fn):
            self.fns[dbname] = fn
            return self

        return decorate

    def __call__(self, cfg, *arg):
        if isinstance(cfg, compat.string_types):
            url = sa_url.make_url(cfg)
        elif isinstance(cfg, sa_url.URL):
            url = cfg
        else:
            url = cfg.db.url
        backend = url.get_backend_name()
        if backend in self.fns:
            return self.fns[backend](cfg, *arg)
        else:
            return self.fns["*"](cfg, *arg)


def create_follower_db(follower_ident):
    for cfg in _configs_for_db_operation():
        log.info("CREATE database %s, URI %r", follower_ident, cfg.db.url)
        _create_db(cfg, cfg.db, follower_ident)


def configure_follower(follower_ident):
    for cfg in config.Config.all_configs():
        _configure_follower(cfg, follower_ident)


def setup_config(db_url, options, file_config, follower_ident):
    if follower_ident:
        db_url = _follower_url_from_main(db_url, follower_ident)
    db_opts = {}
    _update_db_opts(db_url, db_opts)
    eng = engines.testing_engine(db_url, db_opts)
    _post_configure_engine(db_url, eng, follower_ident)
    eng.connect().close()

    cfg = config.Config.register(eng, db_opts, options, file_config)
    if follower_ident:
        _configure_follower(cfg, follower_ident)
    return cfg


def drop_follower_db(follower_ident):
    for cfg in _configs_for_db_operation():
        log.info("DROP database %s, URI %r", follower_ident, cfg.db.url)
        _drop_db(cfg, cfg.db, follower_ident)


def _configs_for_db_operation():
    hosts = set()

    for cfg in config.Config.all_configs():
        cfg.db.dispose()

    for cfg in config.Config.all_configs():
        url = cfg.db.url
        backend = url.get_backend_name()
        host_conf = (backend, url.username, url.host, url.database)

        if host_conf not in hosts:
            yield cfg
            hosts.add(host_conf)

    for cfg in config.Config.all_configs():
        cfg.db.dispose()


@register.init
def _create_db(cfg, eng, ident):
    raise NotImplementedError("no DB creation routine for cfg: %s" % eng.url)


@register.init
def _drop_db(cfg, eng, ident):
    raise NotImplementedError("no DB drop routine for cfg: %s" % eng.url)


@register.init
def _update_db_opts(db_url, db_opts):
    pass


@register.init
def _configure_follower(cfg, ident):
    pass


@register.init
def _post_configure_engine(url, engine, follower_ident):
    pass


@register.init
def _follower_url_from_main(url, ident):
    url = sa_url.make_url(url)
    url.database = ident
    return url


@_update_db_opts.for_db("mssql")
def _mssql_update_db_opts(db_url, db_opts):
    db_opts["legacy_schema_aliasing"] = False


@_follower_url_from_main.for_db("sqlite")
def _sqlite_follower_url_from_main(url, ident):
    url = sa_url.make_url(url)
    if not url.database or url.database == ":memory:":
        return url
    else:
        return sa_url.make_url("sqlite:///%s.db" % ident)


@_post_configure_engine.for_db("sqlite")
def _sqlite_post_configure_engine(url, engine, follower_ident):
    from sqlalchemy import event

    @event.listens_for(engine, "connect")
    def connect(dbapi_connection, connection_record):
        # use file DBs in all cases, memory acts kind of strangely
        # as an attached
        if not follower_ident:
            dbapi_connection.execute(
                'ATTACH DATABASE "test_schema.db" AS test_schema'
            )
        else:
            dbapi_connection.execute(
                'ATTACH DATABASE "%s_test_schema.db" AS test_schema'
                % follower_ident
            )


@_create_db.for_db("postgresql")
def _pg_create_db(cfg, eng, ident):
    template_db = cfg.options.postgresql_templatedb

    with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
        try:
            _pg_drop_db(cfg, conn, ident)
        except Exception:
            pass
        if not template_db:
            template_db = conn.scalar("select current_database()")

        attempt = 0
        while True:
            try:
                conn.execute(
                    "CREATE DATABASE %s TEMPLATE %s" % (ident, template_db)
                )
            except exc.OperationalError as err:
                attempt += 1
                if attempt >= 3:
                    raise
                if "accessed by other users" in str(err):
                    log.info(
                        "Waiting to create %s, URI %r, "
                        "template DB %s is in use sleeping for .5",
                        ident,
                        eng.url,
                        template_db,
                    )
                    time.sleep(0.5)
            except:
                raise
            else:
                break


@_create_db.for_db("mysql")
def _mysql_create_db(cfg, eng, ident):
    with eng.connect() as conn:
        try:
            _mysql_drop_db(cfg, conn, ident)
        except Exception:
            pass

        conn.execute("CREATE DATABASE %s CHARACTER SET utf8mb4" % ident)
        conn.execute(
            "CREATE DATABASE %s_test_schema CHARACTER SET utf8mb4" % ident
        )
        conn.execute(
            "CREATE DATABASE %s_test_schema_2 CHARACTER SET utf8mb4" % ident
        )


@_configure_follower.for_db("mysql")
def _mysql_configure_follower(config, ident):
    config.test_schema = "%s_test_schema" % ident
    config.test_schema_2 = "%s_test_schema_2" % ident


@_create_db.for_db("sqlite")
def _sqlite_create_db(cfg, eng, ident):
    pass


@_drop_db.for_db("postgresql")
def _pg_drop_db(cfg, eng, ident):
    with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
        conn.execute(
            text(
                "select pg_terminate_backend(pid) from pg_stat_activity "
                "where usename=current_user and pid != pg_backend_pid() "
                "and datname=:dname"
            ),
            dname=ident,
        )
        conn.execute("DROP DATABASE %s" % ident)


@_drop_db.for_db("sqlite")
def _sqlite_drop_db(cfg, eng, ident):
    if ident:
        os.remove("%s_test_schema.db" % ident)
    else:
        os.remove("%s.db" % ident)


@_drop_db.for_db("mysql")
def _mysql_drop_db(cfg, eng, ident):
    with eng.connect() as conn:
        conn.execute("DROP DATABASE %s_test_schema" % ident)
        conn.execute("DROP DATABASE %s_test_schema_2" % ident)
        conn.execute("DROP DATABASE %s" % ident)


@_create_db.for_db("oracle")
def _oracle_create_db(cfg, eng, ident):
    # NOTE: make sure you've run "ALTER DATABASE default tablespace users" or
    # similar, so that the default tablespace is not "system"; reflection will
    # fail otherwise
    with eng.connect() as conn:
        conn.execute("create user %s identified by xe" % ident)
        conn.execute("create user %s_ts1 identified by xe" % ident)
        conn.execute("create user %s_ts2 identified by xe" % ident)
        conn.execute("grant dba to %s" % (ident,))
        conn.execute("grant unlimited tablespace to %s" % ident)
        conn.execute("grant unlimited tablespace to %s_ts1" % ident)
        conn.execute("grant unlimited tablespace to %s_ts2" % ident)


@_configure_follower.for_db("oracle")
def _oracle_configure_follower(config, ident):
    config.test_schema = "%s_ts1" % ident
    config.test_schema_2 = "%s_ts2" % ident


def _ora_drop_ignore(conn, dbname):
    try:
        conn.execute("drop user %s cascade" % dbname)
        log.info("Reaped db: %s", dbname)
        return True
    except exc.DatabaseError as err:
        log.warning("couldn't drop db: %s", err)
        return False


@_drop_db.for_db("oracle")
def _oracle_drop_db(cfg, eng, ident):
    with eng.connect() as conn:
        # cx_Oracle seems to occasionally leak open connections when a large
        # suite it run, even if we confirm we have zero references to
        # connection objects.
        # while there is a "kill session" command in Oracle,
        # it unfortunately does not release the connection sufficiently.
        _ora_drop_ignore(conn, ident)
        _ora_drop_ignore(conn, "%s_ts1" % ident)
        _ora_drop_ignore(conn, "%s_ts2" % ident)


@_update_db_opts.for_db("oracle")
def _oracle_update_db_opts(db_url, db_opts):
    pass


def reap_dbs(idents_file):
    log.info("Reaping databases...")

    urls = collections.defaultdict(set)
    idents = collections.defaultdict(set)

    with open(idents_file) as file_:
        for line in file_:
            line = line.strip()
            db_name, db_url = line.split(" ")
            url_obj = sa_url.make_url(db_url)
            url_key = (url_obj.get_backend_name(), url_obj.host)
            urls[url_key].add(db_url)
            idents[url_key].add(db_name)

    for url_key in urls:
        backend = url_key[0]
        url = list(urls[url_key])[0]
        ident = idents[url_key]
        if backend == "oracle":
            _reap_oracle_dbs(url, ident)
        elif backend == "mssql":
            _reap_mssql_dbs(url, ident)


def _reap_oracle_dbs(url, idents):
    log.info("db reaper connecting to %r", url)
    eng = create_engine(url)
    with eng.connect() as conn:

        log.info("identifiers in file: %s", ", ".join(idents))

        to_reap = conn.execute(
            "select u.username from all_users u where username "
            "like 'TEST_%' and not exists (select username "
            "from v$session where username=u.username)"
        )
        all_names = {username.lower() for (username,) in to_reap}
        to_drop = set()
        for name in all_names:
            if name.endswith("_ts1") or name.endswith("_ts2"):
                continue
            elif name in idents:
                to_drop.add(name)
                if "%s_ts1" % name in all_names:
                    to_drop.add("%s_ts1" % name)
                if "%s_ts2" % name in all_names:
                    to_drop.add("%s_ts2" % name)

        dropped = total = 0
        for total, username in enumerate(to_drop, 1):
            if _ora_drop_ignore(conn, username):
                dropped += 1
        log.info(
            "Dropped %d out of %d stale databases detected", dropped, total
        )


@_follower_url_from_main.for_db("oracle")
def _oracle_follower_url_from_main(url, ident):
    url = sa_url.make_url(url)
    url.username = ident
    url.password = "xe"
    return url


@_create_db.for_db("mssql")
def _mssql_create_db(cfg, eng, ident):
    with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
        conn.execute("create database %s" % ident)
        conn.execute(
            "ALTER DATABASE %s SET ALLOW_SNAPSHOT_ISOLATION ON" % ident
        )
        conn.execute(
            "ALTER DATABASE %s SET READ_COMMITTED_SNAPSHOT ON" % ident
        )
        conn.execute("use %s" % ident)
        conn.execute("create schema test_schema")
        conn.execute("create schema test_schema_2")


@_drop_db.for_db("mssql")
def _mssql_drop_db(cfg, eng, ident):
    with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
        _mssql_drop_ignore(conn, ident)


def _mssql_drop_ignore(conn, ident):
    try:
        # typically when this happens, we can't KILL the session anyway,
        # so let the cleanup process drop the DBs
        # for row in conn.execute(
        #     "select session_id from sys.dm_exec_sessions "
        #        "where database_id=db_id('%s')" % ident):
        #    log.info("killing SQL server sesssion %s", row['session_id'])
        #    conn.execute("kill %s" % row['session_id'])

        conn.execute("drop database %s" % ident)
        log.info("Reaped db: %s", ident)
        return True
    except exc.DatabaseError as err:
        log.warning("couldn't drop db: %s", err)
        return False


def _reap_mssql_dbs(url, idents):
    log.info("db reaper connecting to %r", url)
    eng = create_engine(url)
    with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:

        log.info("identifiers in file: %s", ", ".join(idents))

        to_reap = conn.execute(
            "select d.name from sys.databases as d where name "
            "like 'TEST_%' and not exists (select session_id "
            "from sys.dm_exec_sessions "
            "where database_id=d.database_id)"
        )
        all_names = {dbname.lower() for (dbname,) in to_reap}
        to_drop = set()
        for name in all_names:
            if name in idents:
                to_drop.add(name)

        dropped = total = 0
        for total, dbname in enumerate(to_drop, 1):
            if _mssql_drop_ignore(conn, dbname):
                dropped += 1
        log.info(
            "Dropped %d out of %d stale databases detected", dropped, total
        )
